Celery-分布式任务队列

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。

它是一个专注于实时处理的任务队列,同时也支持任务调度。

原理

在celery 中主要有4个角色,productor ,broker,worker,backend

productor 作为生产者发布任务

broker 是消息队列 用于存储任务 官方推荐使用 RabbitMQ 或者redis

worker 消费者, 任务处理逻辑

backend 处理结果

5c9c435e7e435

如何使用

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from kombu import Exchange, Queue
# broker 连接,这里使用redis
BROKER_URL = "redis://172.16.101.24:6379/1"
# BACKEND 连接,这里使用redis
CELERY_RESULT_BACKEND = "redis://172.16.101.24:6379/2"
# 任务序列化方式
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60

# 如果任务没有在 可见性超时 内确认接收,任务会被重新委派给另一个职程并执行。
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 43200}

# 内存泄漏
# 长时间运行Celery有可能发生内存泄露,可以像下面这样设置

CELERYD_CONCURRENCY = 20 # 并发worker数

CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行了多少任务就会死掉 防止内存溢出


CELERY_QUEUES = (
Queue("default", Exchange("default"), routing_key="default"),
Queue("multiplication_task_queue", Exchange("multiplication_task"), routing_key="multiplication_task"),
Queue("sum_all", Exchange("sum_all"), routing_key="sum_all"),
Queue("add_task_queue", Exchange("add_task"), routing_key="add_task")
)
# 定义每个任务使用的队列,不同的任务,最好使用不同的队列 以免混乱
CELERY_ROUTES = {
'task.multiplication_task': {"queue": "multiplication_task_queue", "routing_key": "multiplication_task"},
'task.sum_all': {"queue": "sum_all", "routing_key": "sum_all"},
'task.add_task': {"queue": "add_task_queue", "routing_key": "add_task"}}
# celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1.%h -Q for_task
# 定时任务配置
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}

worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from celery import Celery, platforms, Task, chain
import time

platforms.C_FORCE_ROOT = True

app = Celery()
# 引入配置
app.config_from_object("celeryconfig")


class CalculationTask(Task):
# 任务成功回调
def on_success(self, retval, task_id, args, kwargs):
print 'task done: {0}'.format(retval)
return super(CalculationTask, self).on_success(retval, task_id, args, kwargs)
# 任务失败回调
def on_failure(self, exc, task_id, args, kwargs, einfo):
print 'task fail, reason: {0}'.format(exc)
return super(CalculationTask, self).on_failure(exc, task_id, args, kwargs, einfo)


# 不关心结果
# @app.task(ignore_result=True)

# bind=True 时 Task 本身会作为第一个参数 注入到 函数中
@app.task(bind=True, max_retries=3, default_retry_delay=1 * 6)
def multiplication_task(self, x, y):
try:
time.sleep(1)
# raise Exception("test")
return x * y
except Exception as exc:
# 异常重试 这里countdown 的优先级大于 default_retry_delay
self.retry(exc=exc, countdown=10)


@app.task(base=CalculationTask)
def add_task(x, y):
time.sleep(1)
return x + y


@app.task
def sum_all(x=None):
return sum(x)

进入work 所在的目录 执行命令 celery -A task worker –loglevel=INFO -n worker1.%h

worker 就启动起来了(可以启动多个, 注意修改一下名字作区分)

调度任务(product)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from task import *
from celery import group, chord
import time

start = time.time()
addresult = [add_task.delay(i, i) for i in range(10)]
mult_result = [multiplication_task.delay(i, i) for i in range(20)]

for re in addresult:
print "加法:", re.get()
for re in mult_result:
print "乘法:", re.get()
print "cost:{}".format(str(time.time() - start))


result = add_task.apply_async((2, 2), link=[add_task.s(16), add_task.s(15)])
print result.get()
print result.children[0].get()


# chain
# 串行调用

# chain 函数接受一个任务的列表,Celery 保证一个 chain 里的子任务会依次执行,
# 在 AsynResult 上执行 get 会得到最后一个任务的返回值。和 link 功能类似,
# 每一个任务执行结果会当作参数传入下一个任务,所以如果你不需要这种特性,采用 immutable signature (si)来取消。

result = chain(add_task.s(1, 2), add_task.s(3), add_task.s(4))()
# result = chain(add_task.si(1, 2), add_task.si(3, 3), add_task.si(4, 4))()
print result.get()
print result.parent.parent.graph


# Groups
# 并行调用
#
result = group(add_task.s(1, 2), add_task.s(2, 3), add_task.s(2, 4))()
print result.get()

# Chords
# 先并行调用,再串行调用 4+8+16
# 两种写法等价
# result=chord((add_task.s(2, 2), add_task.s(4, 4), add_task.s(8, 8)), sum_all.s())()
result = chain(group(add_task.s(2, 2), add_task.s(4, 4), add_task.s(8, 8)), sum_all.s())()
print result.get()

# map
# 对并行调用的结果各自汇总
result = sum_all.map([range(10), range(100)]).delay()
print result.get()


# Starmap
# 对并行调用的结果各自汇总,汇总参数是tuple
result = add_task.starmap(zip(range(10), range(10))).delay()
print result.get()